27 实践课-多工具组合调用落地与优化
多工具组合调用落地与优化
关联:索引
术语小抄(初学者版)
-
接口异常(Interface Error):HTTP 500、连接失败、返回非 JSON、字段缺失等“工具对外接口不稳定”问题。
-
参数传递错误(Parameter Passing Error):字段名拼错、类型不匹配、单位/枚举值错误,导致工具报错或返回不可信结果。
-
规范化(Normalize):把不同工具输出统一到同一结构与字段口径(例如统一
ok/trace_id/ts_ms/data/error)。 -
异常结果过滤(Error Filtering):Join 时把“非关键分支失败”收纳到
warnings,不影响主链路,但必须留证据。 -
超时(Timeout):调用超过规定时间直接中止或降级,避免链路“卡死拖垮整体”。
-
重试(Retry):对“可恢复、幂等”的失败再尝试;必须有次数上限与退避(backoff)。
-
幂等(Idempotent):同一请求重复执行不会造成额外副作用(查询类通常幂等,控制类通常不幂等)。
-
熔断(Circuit Breaker):短时间内连续失败则暂时停止调用该工具,防止雪崩;等待恢复后再试探性恢复。
-
先修:建议完成 26(编排骨架与规范化返回)、20(兼容性与契约检查)、09(AI 协同调试主线)。
-
统一口径:工具输出必须结构化,至少包含
ok与trace_id;失败必须带可分类的错误码(error.code)。
复习卡片(与 26 对齐,只保留必要要点):
- 编排层(orchestrator)只负责:决定调用顺序/并发、决定失败分支、做 Join/生成下游入参、保留证据链。
- 控制类工具必须最后执行,并且必须有门禁(权限/阈值/参数范围/二次确认之一)。
- “多工具组合能跑通一次”不等于“可交付”。工业现场最常见的返工原因不是模型不聪明,而是接口不稳、参数不严、结果不统一、异常不可复验。
课程思政融入点(口径统一):
- 工业系统稳定性来自对细节的尊重:一次字段名拼写错误、一次超时未处理、一次异常未留证据,都可能让产线停摆。严谨不是“慢”,而是“少返工、可复验、可回归”的效率。
1)工具接口异常(工具“没按约定说话/根本没说话”)
- 连接失败:DNS/端口/网络断开、证书问题、ROS/DB 服务未启动。
- 超时:工具卡住或响应慢,导致编排层等待过久。
- 响应不可解析:返回非 JSON、编码错误、空响应。
- 返回不合约:缺
ok/trace_id,或失败时没有error.code/message。
2)参数传递错误(工具“被调用了,但喂错了东西”)
- 字段名错:
instruction写成insturction、cmd_id写成cmdId。 - 类型错:应为数字却传字符串;应为 list 却传 dict。
- 值域错:枚举值不在白名单;单位错(秒/毫秒、厘米/米)。
- 业务前置条件不满足:上游置信度不足、关键槽位缺失却继续下发控制。
- 先看“入参”:字段是否齐全、类型是否正确、值域是否合理。
- 再看“调用层”:是否有超时/重试/异常捕获;错误码是否可分类。
- 最后看“工具层”:工具是否启动、接口是否可达、版本是否一致、字段是否漂移(参考 20)。
1)异常结果过滤:主链路不被污染,但证据必须保留
- 关键分支失败:整体失败(fail-fast),返回可解释
final.reason,并保留steps。 - 非关键分支失败:整体仍可成功,但必须写入
warnings(含 tool、error.code、trace_id、ts_ms)。 - 对外输出(final)只给业务关键字段;证据链(steps/warnings)用于排障与回归。
2)格式标准化:Join 的前提是“所有结果可比、可测、可断言”
常见的“字段漂移”兼容策略(与 20 对齐):
success → ok:兼容解析,但对外统一只暴露ok。ts → ts_ms:统一毫秒;发现秒级时间戳要转换并留证据(建议写入data._meta或失败时写入error.detail._meta)。
把下面代码保存为 orchestrator_resilience_l1.py,然后运行。代码使用“故障注入桩工具”模拟真实世界的常见失败:异常抛出、超时、返回不合约、参数缺失。
from __future__ import annotations
import asyncio
import json
import random
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
# 代码说明:
# - 目标:演示“多工具组合调用”的落地优化骨架:标准化返回、异常捕获、超时、重试退避、warnings 降级、证据字段。
# - 原则:所有工具输出统一为 {ok, tool, trace_id, ts_ms, data, error},失败必须可分类(error.code)。
# - 备注:这里的 tool_xxx 都是“故障注入桩工具”,真实项目只需要替换工具函数体,尽量保留编排与规范化骨架。
def now_ms() -> int:
# 统一生成毫秒时间戳:用于排序、审计、回放定位(避免秒/毫秒混用)
return int(time.time() * 1000)
def ok_result(*, tool: str, trace_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
# 统一成功返回:ok 只表示“这一步调用成功”,不等价于“业务完成”
return {"ok": True, "tool": tool, "trace_id": trace_id, "ts_ms": now_ms(), "data": data, "error": None}
def err_result(
*,
tool: str,
trace_id: str,
code: str,
message: str,
detail: Optional[Dict[str, Any]] = None,
retryable: bool = False,
) -> Dict[str, Any]:
# 统一失败返回:
# - code:可统计/可回归断言的错误码
# - message:给人看的简短原因
# - detail:补充证据(缺字段、期望类型、阈值等),便于复现与修复
# - retryable:是否允许重试(通常查询类可重试;控制类默认不重试)
return {
"ok": False,
"tool": tool,
"trace_id": trace_id,
"ts_ms": now_ms(),
"data": {},
"error": {"code": code, "message": message, "detail": detail or {}, "retryable": retryable},
}
def require_fields(obj: Dict[str, Any], fields: List[str]) -> Tuple[bool, List[str]]:
# 必填字段检查:把“KeyError 类异常”前置为结构化错误,减少联调噪声
missing = [f for f in fields if obj.get(f) in (None, "", [])]
return (len(missing) == 0), missing
def _to_obj(x: Any) -> Dict[str, Any]:
# 工具输出兼容:允许工具返回 dict 或 JSON 字符串
if isinstance(x, dict):
return x
if isinstance(x, str):
return json.loads(x)
raise TypeError(f"unsupported tool output type: {type(x)}")
def normalize_tool_result(raw: Any, *, tool: str, trace_id: str) -> Dict[str, Any]:
# 核心:把各种“奇形怪状的工具输出”标准化成统一结构
# - 目的:让 Join/回归测试/审计都能用同一套断言口径
try:
obj = _to_obj(raw)
except Exception as e:
return err_result(tool=tool, trace_id=trace_id, code="RESP_NOT_JSON", message="tool response is not valid json", detail={"err": str(e)})
# meta 用于记录“兼容转换/注入行为”,避免学生误以为工具本身天然返回了这些字段
meta: Dict[str, Any] = {}
if "ok" not in obj and "success" in obj:
# 兼容字段漂移:success → ok(对外只暴露 ok)
obj["ok"] = bool(obj["success"])
meta["ok_compat_from_success"] = True
if "ts_ms" not in obj and "ts" in obj:
# 兼容时间戳漂移:ts(秒/毫秒不确定)→ ts_ms(统一毫秒)
ts = obj["ts"]
if isinstance(ts, (int, float)) and ts < 10_000_000_000:
obj["ts_ms"] = int(ts * 1000)
meta["ts_converted_from_s"] = True
else:
obj["ts_ms"] = int(ts)
if "trace_id" not in obj:
# 工具未返回 trace_id:编排层注入(不推荐,但现实常见)
obj["trace_id"] = trace_id
meta["trace_id_injected"] = True
# 兜底:保证 tool/ts_ms/data 形态稳定,避免后续 Join 写一堆 if/else
obj["tool"] = obj.get("tool") or tool
obj["ts_ms"] = int(obj.get("ts_ms") or now_ms())
obj["data"] = obj.get("data") if isinstance(obj.get("data"), dict) else {}
ok = bool(obj.get("ok"))
if ok:
# 成功:把 meta 写入 data._meta(可选证据字段),方便复盘兼容转换发生了什么
obj["error"] = None
data = dict(obj["data"])
if meta:
data["_meta"] = meta
return {"ok": True, "tool": obj["tool"], "trace_id": obj["trace_id"], "ts_ms": obj["ts_ms"], "data": data, "error": None}
err = obj.get("error")
if not isinstance(err, dict):
# 失败但 error 不是对象:统一成可断言的结构化错误
err = {"code": "RESP_ERROR_SHAPE", "message": "tool error is not an object", "detail": {"raw_error": str(obj.get("error"))}}
if "code" not in err or "message" not in err:
# 失败但缺少 error.code/message:统一归类为 RESP_ERROR_SHAPE
err = {"code": "RESP_ERROR_SHAPE", "message": "missing error.code/message", "detail": {"raw_error": err}}
err.setdefault("detail", {})
if meta:
# 失败:把 meta 写入 error.detail._meta(便于解释“为何这次失败/为何这次被注入”)
err["detail"].setdefault("_meta", {}).update(meta)
err.setdefault("retryable", False)
return {"ok": False, "tool": obj["tool"], "trace_id": obj["trace_id"], "ts_ms": obj["ts_ms"], "data": {}, "error": err}
async def call_with_timeout(coro, *, timeout_s: float, tool: str, trace_id: str) -> Dict[str, Any]:
# 把“超时/异常抛出”变成结构化结果,避免编排层直接崩溃
try:
raw = await asyncio.wait_for(coro, timeout=timeout_s)
return normalize_tool_result(raw, tool=tool, trace_id=trace_id)
except asyncio.TimeoutError:
return err_result(tool=tool, trace_id=trace_id, code="TIMEOUT", message=f"tool timeout after {timeout_s}s", retryable=True)
except Exception as e:
return err_result(tool=tool, trace_id=trace_id, code="EXCEPTION", message="tool raised exception", detail={"err": repr(e)}, retryable=True)
async def retry_call(
*,
call_fn,
tool: str,
trace_id: str,
timeout_s: float,
retries: int,
base_backoff_ms: int,
) -> Dict[str, Any]:
# 统一重试 wrapper:
# - 只对 retryable=true 的失败重试
# - 指数退避 + 随机抖动,避免高并发场景“集体重试雪崩”
# - 控制类工具建议默认不重试(除非你能证明幂等,并且回执可对齐)
last: Optional[Dict[str, Any]] = None
for i in range(retries + 1):
out = await call_with_timeout(call_fn(), timeout_s=timeout_s, tool=tool, trace_id=trace_id)
last = out
if out["ok"]:
return out
retryable = bool((out.get("error") or {}).get("retryable"))
if (not retryable) or i == retries:
return out
backoff_ms = base_backoff_ms * (2**i) + random.randint(0, 80)
await asyncio.sleep(backoff_ms / 1000)
return last or err_result(tool=tool, trace_id=trace_id, code="UNKNOWN", message="unexpected retry loop end")
async def tool_parse_instruction(*, instruction: str, trace_id: str) -> Dict[str, Any]:
# 工具桩 1:解析指令(故障注入)
# 真实项目可替换为:意图识别/槽位抽取/参数标准化
tool = "parse_instruction"
if not isinstance(instruction, str):
return err_result(
tool=tool,
trace_id=trace_id,
code="TYPE_ERROR",
message="instruction must be a string",
detail={"expect": "str", "got": type(instruction).__name__},
retryable=False,
)
if not instruction.strip():
return err_result(tool=tool, trace_id=trace_id, code="INPUT_EMPTY", message="instruction is empty", retryable=False)
await asyncio.sleep(0.05)
if "字段错" in instruction:
return {"success": False, "error": "bad shape"} # 故意返回不合约:用于演示 normalize
return ok_result(tool=tool, trace_id=trace_id, data={"action": "place", "target_bin": "bin_a"})
async def tool_query_kb(*, query: str, trace_id: str) -> Dict[str, Any]:
# 工具桩 2:知识库检索(故障注入)
# 真实项目可替换为:向量库/数据库/规则库查询;通常属于“可降级”的非关键分支
tool = "kb_search"
if not isinstance(query, str):
return err_result(
tool=tool,
trace_id=trace_id,
code="TYPE_ERROR",
message="query must be a string",
detail={"expect": "str", "got": type(query).__name__},
retryable=False,
)
await asyncio.sleep(0.15)
if "超时" in query:
await asyncio.sleep(5)
if "抛异常" in query:
raise RuntimeError("kb backend crashed")
return ok_result(tool=tool, trace_id=trace_id, data={"hits": [{"id": "doc-01", "score": 0.76}]})
async def tool_arm_control(*, action: str, target_bin: str, trace_id: str) -> Dict[str, Any]:
# 工具桩 3:设备控制(示例)
# 关键原则:控制类工具必须最后执行,并且要严格做参数校验与安全拒绝
tool = "arm_control"
ok1, missing = require_fields({"action": action, "target_bin": target_bin}, ["action", "target_bin"])
if not ok1:
return err_result(tool=tool, trace_id=trace_id, code="INPUT_MISSING", message="missing required fields", detail={"missing": missing})
if action not in {"place"}:
return err_result(
tool=tool,
trace_id=trace_id,
code="VALUE_ERROR",
message="unsupported action",
detail={"action": action, "allow": ["place"]},
retryable=False,
)
await asyncio.sleep(0.05)
if target_bin == "bin_unknown":
return err_result(tool=tool, trace_id=trace_id, code="SAFETY_REJECT", message="unsafe target_bin", retryable=False)
return ok_result(tool=tool, trace_id=trace_id, data={"ack": True, "cmd_id": "c-001"})
@dataclass
class PipelineOutput:
ok: bool
trace_id: str
steps: List[Dict[str, Any]]
final: Dict[str, Any]
warnings: List[Dict[str, Any]]
def to_dict(self) -> Dict[str, Any]:
return {"ok": self.ok, "trace_id": self.trace_id, "steps": self.steps, "final": self.final, "warnings": self.warnings}
async def orchestrate(*, instruction: str, kb_query: str) -> PipelineOutput:
# 编排主函数:
# - 关键分支:parse → control(失败则 fail-fast)
# - 非关键分支:kb_search(失败则写入 warnings,不阻断主链路)
trace_id = uuid.uuid4().hex[:8]
steps: List[Dict[str, Any]] = []
warnings: List[Dict[str, Any]] = []
# Step 1(关键):解析
parse = await retry_call(
call_fn=lambda: tool_parse_instruction(instruction=instruction, trace_id=trace_id),
tool="parse_instruction",
trace_id=trace_id,
timeout_s=1.0,
retries=0,
base_backoff_ms=100,
)
steps.append(parse)
if not parse["ok"]:
return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "parse_failed", "error": parse["error"]}, warnings=warnings)
# Step 2(非关键):检索(失败允许降级,但必须留证据)
kb = await retry_call(
call_fn=lambda: tool_query_kb(query=kb_query, trace_id=trace_id),
tool="kb_search",
trace_id=trace_id,
timeout_s=0.3,
retries=2,
base_backoff_ms=120,
)
steps.append(kb)
if not kb["ok"]:
warnings.append({"tool": kb["tool"], "trace_id": kb["trace_id"], "ts_ms": kb["ts_ms"], "error": kb["error"]})
# Step 3(关键):控制(示例中默认不重试)
control = await retry_call(
call_fn=lambda: tool_arm_control(action=str(parse["data"]["action"]), target_bin=str(parse["data"]["target_bin"]), trace_id=trace_id),
tool="arm_control",
trace_id=trace_id,
timeout_s=1.0,
retries=0,
base_backoff_ms=100,
)
steps.append(control)
if not control["ok"]:
return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "control_failed", "error": control["error"]}, warnings=warnings)
# 对外 final:只保留业务关键字段;证据链放在 steps/warnings
final = {"action": parse["data"]["action"], "target_bin": parse["data"]["target_bin"], "cmd_id": control["data"]["cmd_id"]}
if kb["ok"]:
final["kb_hit_count"] = len(kb["data"].get("hits") or [])
return PipelineOutput(ok=True, trace_id=trace_id, steps=steps, final=final, warnings=warnings)
def main():
# 三组演示用例:
# 1)全成功
# 2)parse 返回不合约 + kb 抛异常(演示 normalize + EXCEPTION)
# 3)kb 超时(演示 TIMEOUT + warnings 降级)
out = asyncio.run(orchestrate(instruction="把苹果放到A格口", kb_query="查询分拣规则"))
print(json.dumps(out.to_dict(), ensure_ascii=False, indent=2))
out2 = asyncio.run(orchestrate(instruction="字段错:把苹果放到A格口", kb_query="抛异常"))
print(json.dumps(out2.to_dict(), ensure_ascii=False, indent=2))
out3 = asyncio.run(orchestrate(instruction="把苹果放到A格口", kb_query="超时"))
print(json.dumps(out3.to_dict(), ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
逐段解释与自检要点:
normalize_tool_result:把“dict/JSON 字符串/字段漂移/错误形态不一致”统一成可 Join、可断言的结果结构;失败时强制补齐error.code/message。data._meta / error.detail._meta:记录兼容转换与注入行为(例如ts秒→毫秒、success→ok、补齐 trace_id),方便复盘“为什么这次能跑通/为什么这次被降级”。call_with_timeout:把“超时/异常抛出”变成结构化错误(TIMEOUT/EXCEPTION),避免编排层直接崩溃。retry_call:只对retryable=true且“工具本身幂等”的失败重试,并带指数退避与随机抖动(防止同一时间一起重试造成雪崩);控制类工具建议默认retryable=false。warnings:演示“非关键分支失败可降级”,但必须保留证据字段(tool/trace_id/ts_ms/error)。- 自检:运行 3 次输出,观察
warnings在“KB 超时/抛异常”时是否被填充;观察“字段错”时是否被 normalize 成结构化错误而不是程序崩溃。
运行命令(在含 orchestrator_resilience_l1.py 的目录执行):
python orchestrator_resilience_l1.py
命令解释与自检要点:
python orchestrator_resilience_l1.py:执行 3 组案例(正常/返回不合约/超时与异常),每次输出一段 JSON。- 自检 1:失败时是否仍有
trace_id且steps保留证据。 - 自检 2:是否出现“未捕获异常导致程序退出”(不应出现)。
- 自检 3:
warnings是否只承载非关键分支的失败,不污染final的关键字段。
目标(分组):
- 把你们现有的多工具编排代码做一次“落地优化”:为至少 2 个工具加入标准化返回 + 超时处理;为 Join 加入异常过滤(warnings);复现并修复至少 1 个参数传递错误。
步骤(建议):
- 选定你们链路中的 2 个“高频失败点工具”(例如 KB 检索、ROS 控制、数据库查询)。
- 给每个工具加一层统一 wrapper:异常捕获、超时、规范化返回(字段漂移兼容)。
- 在编排 Join 处实施过滤:关键分支失败 fail-fast;非关键分支失败写入 warnings。
- 写 2 条错例:字段缺失、类型错误或单位错误,并确保错误码能被归类(INPUT_MISSING/TYPE_ERROR/UNIT_ERROR 等)。
- 给“参数错误提示”定口径:至少能回答三件事——缺哪个字段、期望类型是什么、允许范围/枚举有哪些(写进
error.detail)。
-
能打印一份结构化 JSON,包含:
ok/trace_id/steps/final/warnings。 -
至少复现并修复 1 类问题:参数传递错误或接口异常,且能用
trace_id串联证据。 -
复杂场景下的容错不是“放过错误”,而是把错误变成可控行为:该拒绝就拒绝、该降级就降级、该熔断就熔断、且每一种行为都能用数据回归验证。
1)先做“策略表”:不同类型工具,容错策略不同
- 查询类工具(KB/DB/规则查询):可超时、可重试、可降级(返回空结果+warnings),通常幂等。
- 计算类工具(解析/识别/分类):可超时、可重试(少量)、可回退(改用规则/低精度模型),但要保留置信度与来源。
- 控制类工具(ROS2/PLC/设备动作):原则上不重试(或只在“已知幂等且有回执对齐”的前提下重试),必须门禁,必要时支持“干跑(dry-run)/二次确认”。
2)容错模式四件套(落地可写成代码)
- 超时(Timeout)
- 目标:避免链路卡死。
- 落地:对每个工具配置独立超时,不用“一刀切”全局超时。
- 有条件重试(Retry with backoff)
- 目标:对可恢复故障(网络抖动/偶发超时)提高成功率。
- 落地:只对
retryable=true且工具幂等的调用重试;控制类默认不重试。
- 降级(Fallback / Degrade)
- 目标:非关键能力失败时仍能产出可用结果。
- 落地:把失败写进 warnings,并给出“降级的业务解释”(例如“本次未检索到规则文档,已按默认安全策略执行/已拒绝控制并提示补充信息”)。
- 熔断(Circuit Breaker)
- 目标:当某个工具持续失败时,短时间内停止调用它,保护系统整体。
- 落地:连续失败达到阈值进入 OPEN;等待一段时间进入 HALF_OPEN 试探恢复;成功则恢复 CLOSED。
- 补偿/回滚(Compensation)
- 目标:当“部分步骤已成功、后续失败”时,通过补偿动作把系统拉回可控状态(不是简单把时间倒回)。
- 落地:把“补偿动作”当作独立工具(例如撤销订单/标记任务失败/写入告警工单);控制类动作必须依赖回执(cmd_id/receipt)才能做补偿或人工介入。
下面代码片段可直接粘贴进你们的编排层(把 call_fn 换成真实工具调用即可)。
import time
from dataclasses import dataclass
from typing import Callable, Dict, Optional
@dataclass
class CircuitBreaker:
# 熔断器的最小状态机:
# - CLOSED:正常放行
# - OPEN:拒绝请求(直接走降级/兜底),等待 reset_timeout_s
# - HALF_OPEN:试探性放行 1 次(或少量次数),成功则恢复 CLOSED,失败则回到 OPEN
failure_threshold: int
reset_timeout_s: float
state: str = "CLOSED" # CLOSED | OPEN | HALF_OPEN
consecutive_failures: int = 0
opened_at: Optional[float] = None
def allow_request(self) -> bool:
# 判断“当前是否允许调用工具”
if self.state == "CLOSED":
return True
if self.state == "OPEN":
if self.opened_at is None:
return False
if (time.time() - self.opened_at) >= self.reset_timeout_s:
# 到时间后进入 HALF_OPEN:允许一次试探调用
self.state = "HALF_OPEN"
return True
return False
return True
def record_success(self) -> None:
# 一旦成功:清空连续失败并关闭熔断
self.state = "CLOSED"
self.consecutive_failures = 0
self.opened_at = None
def record_failure(self) -> None:
# 连续失败达到阈值:打开熔断并记录打开时间
self.consecutive_failures += 1
if self.consecutive_failures >= self.failure_threshold:
self.state = "OPEN"
self.opened_at = time.time()
def call_with_circuit_breaker(
*,
breaker: CircuitBreaker,
call_fn: Callable[[], Dict],
on_open_fallback: Callable[[], Dict],
) -> Dict:
# 熔断包装器:
# - 若熔断为 OPEN 且未到恢复时间:不调用工具,直接走 fallback(并写入 warnings/错误码)
if not breaker.allow_request():
return on_open_fallback()
out = call_fn()
if bool(out.get("ok")):
breaker.record_success()
return out
breaker.record_failure()
return out
逐段解释与自检要点:
-
failure_threshold/reset_timeout_s:阈值要按工具特性设置,避免“一两次失败就熔断”。 -
on_open_fallback:熔断时必须返回结构化结果(通常ok=false + error.code=CIRCUIT_OPEN,并写入 warnings),不能让程序崩溃。 -
自检:把 breaker 的状态写进
steps[*].data._meta(成功)或error.detail._meta(失败),或写入warnings,便于解释“为什么本次没调用工具而是直接降级”。 -
提醒:如果你们的工具调用是 async(例如
await tool_xxx(...)),可把call_with_circuit_breaker改成async def并在内部await call_fn(),其余状态机逻辑不变。 -
失败样本:1 条完整输入(用户原句/上游数据)+ 输出 JSON(steps/final/warnings)+ trace_id。
-
代码范围:只贴“调用处 + 工具签名 + normalize/Join”相关代码,不要全仓库乱贴。
-
预期行为:你希望是“拒绝/降级/重试/熔断”哪一种,以及理由。
AI 容错方案生成模板(可直接复制):
你是工业级编排与容错工程师。请基于我的现状,生成“可落地的容错设计方案 + 最小代码改动”,要求:
1)先输出策略表:每个工具属于查询/计算/控制哪类?允许超时/重试/降级/熔断吗?理由是什么?
2)给出错误码分类建议(不超过 12 个 code),并说明每个 code 的触发条件与建议处理方式(fail-fast / retry / fallback)。
3)给出最小补丁代码:实现超时、重试退避、warnings、(可选)熔断;不要引入新依赖。
4)给出 4 条回归用例(2 正常 + 2 异常),每条必须能复验(输入→期望行为→断言点)。
现有工具列表与签名:{粘贴}
现有编排代码片段:{粘贴}
失败证据(含 trace_id 的输出):{粘贴}
AI 输出审计清单(必须做):
- 是否违反“控制类工具必须最后执行 + 默认不重试”的安全原则。
- 是否篡改了你们既定字段口径(导致兼容性问题)。
- 是否给出能复验的用例与断言点(error.code、warnings、steps 顺序/包含关系、trace_id)。
建议的最小异常标注格式(JSONL:一行一个样本),保存为 anomaly_labels.jsonl:
{"id":"a-001","input":{"instruction":"把苹果放到A格口","kb_query":"超时"},"expect":{"behavior":"DEGRADE","warnings_contains":["TIMEOUT"],"final_required_keys":["action","target_bin"]}}
{"id":"a-002","input":{"instruction":"","kb_query":"查询分拣规则"},"expect":{"behavior":"FAIL_FAST","error_code":"INPUT_EMPTY","failed_step":"parse_instruction"}}
{"id":"a-003","input":{"instruction":"字段错:把苹果放到A格口","kb_query":"查询分拣规则"},"expect":{"behavior":"FAIL_FAST","error_code":"RESP_ERROR_SHAPE","failed_step":"parse_instruction"}}
{"id":"a-004","input":{"instruction":"把苹果放到A格口","kb_query":"抛异常"},"expect":{"behavior":"DEGRADE","warnings_contains":["EXCEPTION"],"final_required_keys":["cmd_id"]}}
字段解释与自检要点:
behavior:期望行为只能从有限集合里选(FAIL_FAST / DEGRADE / RETRY_SUCCEED / CIRCUIT_OPEN),避免“期望描述太泛”导致无法回归。warnings_contains:用于断言降级是否真的发生(例如 TIMEOUT/EXCEPTION 是否出现在 warnings)。final_required_keys:用于断言“降级后主链路仍产出关键字段”。failed_step:建议与steps[*].tool的值保持一致(例如parse_instruction/kb_search/arm_control),方便一眼定位失败发生在哪一步。
扩展样本(可选,完成“熔断接入”后再加进 JSONL):
{"id":"a-005","input":{"instruction":"把苹果放到A格口","kb_query":"查询分拣规则"},"expect":{"behavior":"CIRCUIT_OPEN","warnings_contains":["CIRCUIT_OPEN"]}}
可选:异常标注数据的回归脚本骨架(标准库即可)
import json
def load_jsonl(path: str):
# 读取 JSONL:每行一个样本,适合持续追加异常案例做回归
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
yield json.loads(line)
def has_warning(warnings, codes):
# 把 warnings 里的 error.code 抽成集合,再做“包含关系”断言
# 目的:避免用字符串 contains 产生误判
codes = set(codes or [])
seen = {(w.get("error") or {}).get("code") for w in (warnings or [])}
return codes.issubset(seen)
def main():
# 约定:你已把第一课时代码保存为 orchestrator_resilience_l1.py
# 并且其中有 async orchestrate(instruction, kb_query) 返回 to_dict() 的结构
from orchestrator_resilience_l1 import orchestrate
import asyncio
for case in load_jsonl("anomaly_labels.jsonl"):
cid = case["id"]
inp = case["input"]
exp = case["expect"]
# 运行一条样本:输出结构里包含 ok/final/warnings/steps/trace_id
out = asyncio.run(orchestrate(instruction=inp.get("instruction", ""), kb_query=inp.get("kb_query", ""))).to_dict()
ok = bool(out.get("ok"))
final = out.get("final") or {}
warnings = out.get("warnings") or []
behavior = exp.get("behavior")
if behavior == "FAIL_FAST":
# FAIL_FAST:关键分支失败,整体必须失败,且 error.code 要能对上标注期望
assert ok is False, (cid, "should fail-fast")
if "error_code" in exp:
got = ((final.get("error") or {}).get("code"))
assert got == exp["error_code"], (cid, got, exp["error_code"])
elif behavior == "DEGRADE":
# DEGRADE:非关键分支失败允许降级,但必须留下 warnings 证据,且 final 仍包含关键字段
if "warnings_contains" in exp:
assert has_warning(warnings, exp["warnings_contains"]), (cid, warnings)
for k in exp.get("final_required_keys") or []:
assert k in final, (cid, "missing final key", k)
else:
raise ValueError(f"unknown behavior: {behavior} | case={cid}")
print("all anomaly labels passed")
if __name__ == "__main__":
main()
逐段解释与自检要点:
load_jsonl:按行读取 JSONL,保证“新增一条异常样本就能回归一条用例”。has_warning:把 warnings 里的error.code抽出来做集合断言,避免写字符串包含判断导致误判。FAIL_FAST/DEGRADE:把“期望行为”固化为可断言的有限集合,减少争论与漂移;完成熔断接入后再扩展CIRCUIT_OPEN分支与用例。- 自检:当你修复了一类异常,至少要让对应 case 从 Fail 变成 Pass,并把修复点写进提交说明。
落地建议(每组至少做到 1 条):
-
把你们真实项目里最常见的 2 类异常做成标注样本(例如“缺字段”“超时”),并写入期望行为。
-
修复后用同一批异常样本回归,保证“修复有效且不反复”。
-
工程素养的核心是:对不确定性负责。把异常当作系统的一部分来设计,而不是“等出事再修”。
-
工业场景的稳定性来自“可解释、可追溯、可回归”的纪律:每一次容错策略都要能被证明有效,每一次修复都要能被复验。
课后作业(布置)
1)提交调试后的工具调用代码、运行成功截图(含多工具调用完整流程)。
2)提交容错设计方案与对应的代码实现,附 150 字左右说明,阐述优化效果。
3)提交标注异常场景数据的使用记录,说明如何基于数据完善逻辑。
Markdown 与代码自检清单(提交前自查)
- 标题层级是否连续;列表缩进是否正确;代码块是否成对闭合且语言标签正确。
- 代码是否能独立运行(复制到文件后执行不报语法错);命令是否拼写正确。
- 输出是否包含最小证据字段:
trace_id、steps、失败时error.code或warnings。 - 控制类工具是否最后执行且有门禁;是否避免了对控制类工具的盲目重试。